package org.xian.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Basic Reactor Design
 * 单线程的 Reactor：等同于 org.xian.nio.NIOServer
 */
public class WorkerThreadPoolsReactor implements Runnable {

    public static ExecutorService pool = Executors.newFixedThreadPool(5);

    final Selector selector;

    final ServerSocketChannel serverSocket;

    public WorkerThreadPoolsReactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        selectionKey.attach(new Acceptor());
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keys = selectionKeys.iterator();
                while (keys.hasNext()) {
                    dispatch(keys.next());
                }
                selectionKeys.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void dispatch(SelectionKey key) {
        Runnable r = (Runnable) key.attachment();
        if (r != null) {
            r.run();
        }
    }

    class Acceptor implements Runnable {

        @Override
        public void run() {
            try {
                SocketChannel s = serverSocket.accept();
                if (s != null) {
                    new Handler(selector, s);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    class Handler implements Runnable {
        final SocketChannel socket;
        final SelectionKey sk;

        final Selector selector;

        ByteBuffer input = ByteBuffer.allocate(1024);
        ByteBuffer output = ByteBuffer.allocate(1024);

        final int READING = 0, SENDING = 1, PROCESSING = 3;

        int stage = READING;


        public Handler(Selector sel, SocketChannel s) throws IOException {
            socket = s;
            socket.configureBlocking(false);
            sk = socket.register(sel, 0);
            sk.attach(this);
            sk.interestOps(SelectionKey.OP_READ);
            sel.wakeup();
            selector = sel;
        }

        @Override
        public void run() {
            try {
                if (stage == READING) read();
                if (stage == SENDING) send();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        synchronized void read() throws IOException {
            socket.read(input);
            if (inputIsComplete()) {
                stage = PROCESSING;
                WorkerThreadPoolsReactor.pool.execute(new Processer());
            }
        }

        synchronized void send() throws IOException {
            output.flip();
            socket.write(output);
            if (outputIsComplete()) {
                output.clear();
                stage = READING;
                sk.interestOps(SelectionKey.OP_READ);
            }
        }

        void process() {
            input.flip();
            byte[] bytes = new byte[input.remaining()];
            input.get(bytes);
            String message = new String(bytes, StandardCharsets.UTF_8);
            System.out.println("Server Received:" + message);
            String request = "Server Re:" + message;
            input.compact();
            output.put(request.getBytes(StandardCharsets.UTF_8));
        }

        synchronized void processAndHandOff() {
            process();
            stage = SENDING;
            sk.interestOps(SelectionKey.OP_WRITE);
            selector.wakeup();
        }

        boolean inputIsComplete() {
            return input.hasRemaining();
        }

        boolean outputIsComplete() {
            return !output.hasRemaining();
        }

        class Processer implements Runnable {

            @Override
            public void run() {
                processAndHandOff();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        WorkerThreadPoolsReactor reactor = new WorkerThreadPoolsReactor(8080);
        new Thread(reactor).start();
    }
}
